package onion.mqtt.server.dispatcher;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttQoS;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.manager.SubscribeManager;
import onion.mqtt.server.store.MessageStore;
import onion.mqtt.server.store.SessionStore;
import onion.mqtt.server.store.SubscribeStore;
import org.apache.commons.lang3.ObjectUtils;

import java.util.List;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/13
 */
public class MqttMessageDispatcher implements IMqttMessageDispatcher{

    @Override
    public void dispatchRetainMsg(Channel channel, List<MessageStore> messageList) {
        dispatchPublishMsg(channel, messageList);
    }

    @Override
    public void dispatchWillMsg(Channel channel, List<MessageStore> messageList) {
        dispatchPublishMsg(channel, messageList);
    }

    /**
     * 分发消息
     * @param channel
     * @param messageList
     */
    private void dispatchPublishMsg(Channel channel, List<MessageStore> messageList) {
        if (ObjectUtils.isEmpty(messageList)) {
            return;
        }
        messageList.forEach(message -> {
            if (message != null) {
                List<SubscribeStore> subscribeStores = SubscribeManager.getInstance().searchSubscribe(message.getTopic());
                subscribeStores.forEach(subscribeStore -> {
                    SessionStore session = SessionManager.getInstance().getSession(subscribeStore.getClientId());
                    if (session != null) {
                        MqttPublishMessage publishMessage = MqttMessageBuilders.publish()
                                .topicName(message.getTopic())
                                .retained(message.isRetain())
                                .qos(MqttQoS.valueOf(message.getQoS()))
                                .payload(Unpooled.buffer().writeBytes(message.getPayload()))
                                .build();
                        session.getChannel().writeAndFlush(publishMessage);
                    }
                });
            }
        });
    }
}
